/**
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.tongtech.cnmq.handlers.kot;

import static com.google.common.base.Preconditions.checkState;
import static com.tongtech.cnmq.handlers.kot.KotServerStats.SERVER_SCOPE;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import com.tongtech.cnmq.handlers.kot.coordinator.group.GroupConfig;
import com.tongtech.cnmq.handlers.kot.coordinator.group.GroupCoordinator;
import com.tongtech.cnmq.handlers.kot.coordinator.group.OffsetConfig;
import com.tongtech.cnmq.handlers.kot.coordinator.transaction.TransactionConfig;
import com.tongtech.cnmq.handlers.kot.coordinator.transaction.TransactionCoordinator;
import com.tongtech.cnmq.handlers.kot.http.HttpChannelInitializer;
import com.tongtech.cnmq.handlers.kot.migration.MigrationManager;
import com.tongtech.cnmq.handlers.kot.schemaregistry.SchemaRegistryChannelInitializer;
import com.tongtech.cnmq.handlers.kot.stats.PrometheusMetricsProvider;
import com.tongtech.cnmq.handlers.kot.stats.StatsLogger;
import com.tongtech.cnmq.handlers.kot.storage.MemoryProducerStateManagerSnapshotBuffer;
import com.tongtech.cnmq.handlers.kot.storage.ProducerStateManagerSnapshotBuffer;
import com.tongtech.cnmq.handlers.kot.storage.ReplicaManager;
import com.tongtech.cnmq.handlers.kot.utils.ConfigurationUtils;
import com.tongtech.cnmq.handlers.kot.utils.KotTopic;
import com.tongtech.cnmq.handlers.kot.utils.MetadataUtils;
import com.tongtech.cnmq.handlers.kot.utils.delayed.DelayedOperation;
import com.tongtech.cnmq.handlers.kot.utils.delayed.DelayedOperationPurgatory;
import com.tongtech.cnmq.handlers.kot.utils.timer.SystemTimer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import com.tongtech.bookkeeper.common.util.OrderedExecutor;
import com.tongtech.bookkeeper.common.util.OrderedScheduler;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Time;
import com.tongtech.cnmq.broker.CnmqServerException;
import com.tongtech.cnmq.broker.ServiceConfiguration;
import com.tongtech.cnmq.broker.protocol.ProtocolHandler;
import com.tongtech.cnmq.broker.service.BrokerService;
import com.tongtech.cnmq.client.admin.CnmqAdmin;
import com.tongtech.cnmq.client.admin.CnmqAdminException;
import com.tongtech.cnmq.common.naming.NamespaceName;
import com.tongtech.cnmq.common.naming.TopicName;
import com.tongtech.cnmq.common.policies.data.ClusterData;
import com.tongtech.cnmq.common.util.FutureUtil;

/**
 * Kafka Protocol Handler load and run by Cnmq Service.
 */
@Slf4j
public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManager {

    public static final String PROTOCOL_NAME = "kafka";
    public static final String TLS_HANDLER = "tls";
    @Getter
    private RequestStats requestStats;
    private PrometheusMetricsProvider statsProvider;
    @Getter
    private KotBrokerLookupManager kotBrokerLookupManager;
    @VisibleForTesting
    @Getter
    private AdminManager adminManager = null;
    private SystemTopicClient txnTopicClient;
    private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
    private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
    private LookupClient lookupClient;

    private KafkaTopicLookupService kafkaTopicLookupService;
    @VisibleForTesting
    @Getter
    private Map<InetSocketAddress, ChannelInitializer<SocketChannel>> channelInitializerMap;

    @Getter
    @VisibleForTesting
    protected SystemTopicClient offsetTopicClient;

    @Getter
    private KafkaServiceConfiguration kafkaConfig;
    private BrokerService brokerService;
    private KafkaTopicManagerSharedState kafkaTopicManagerSharedState;

    @Getter
    private KotEventManager kotEventManager;
    private OrderedScheduler sendResponseScheduler;
    @VisibleForTesting
    @Getter
    private NamespaceBundleOwnershipListenerImpl bundleListener;
    @VisibleForTesting
    @Getter
    private SchemaRegistryManager schemaRegistryManager;
    private MigrationManager migrationManager;
    private ReplicaManager replicaManager;

    private ScheduledFuture<?> txUpdatedPurgeAbortedTxOffsetsTimeHandle;

    private final Map<String, GroupCoordinator> groupCoordinatorsByTenant = new ConcurrentHashMap<>();
    private final Map<String, TransactionCoordinator> transactionCoordinatorByTenant = new ConcurrentHashMap<>();

    @VisibleForTesting
    @Getter
    private OrderedExecutor recoveryExecutor;

    @Override
    public GroupCoordinator getGroupCoordinator(String tenant) {
        return groupCoordinatorsByTenant.computeIfAbsent(tenant, this::createAndBootGroupCoordinator);
    }

    @VisibleForTesting
    public Map<String, GroupCoordinator> getGroupCoordinators() {
        return groupCoordinatorsByTenant;
    }

    @Override
    public TransactionCoordinator getTransactionCoordinator(String tenant) {
        return transactionCoordinatorByTenant.computeIfAbsent(tenant, this::createAndBootTransactionCoordinator);
    }

    public ReplicaManager getReplicaManager() {
        return replicaManager;
    }

    @Override
    public String protocolName() {
        return PROTOCOL_NAME;
    }

    @Override
    public boolean accept(String protocol) {
        return PROTOCOL_NAME.equalsIgnoreCase(protocol);
    }

    @Override
    public void initialize(ServiceConfiguration conf) throws Exception {
        // init config
        if (conf instanceof KafkaServiceConfiguration) {
            // in unit test, passed in conf will be KafkaServiceConfiguration
            kafkaConfig = (KafkaServiceConfiguration) conf;
        } else {
            // when loaded with CnmqService as NAR, `conf` will be type of ServiceConfiguration
            kafkaConfig = ConfigurationUtils.create(conf.getProperties(), KafkaServiceConfiguration.class);

            // some of the configs value in conf.properties may not updated.
            // So need to get latest value from conf itself
            kafkaConfig.setAdvertisedAddress(conf.getAdvertisedAddress());
            kafkaConfig.setBindAddress(conf.getBindAddress());
        }

        // Validate the namespaces
        for (String fullNamespace : kafkaConfig.getKotAllowedNamespaces()) {
            final String[] tokens = fullNamespace.split("/");
            if (tokens.length != 2) {
                throw new IllegalArgumentException(
                        "Invalid namespace '" + fullNamespace + "' in kotAllowedNamespaces config");
            }
            NamespaceName.validateNamespaceName(
                    tokens[0].replace(KafkaServiceConfiguration.TENANT_PLACEHOLDER, kafkaConfig.getKafkaTenant()),
                    tokens[1].replace("*", kafkaConfig.getKafkaNamespace()));
        }

        statsProvider = new PrometheusMetricsProvider();
        StatsLogger rootStatsLogger = statsProvider.getStatsLogger("");
        requestStats = new RequestStats(rootStatsLogger.scope(SERVER_SCOPE));
        sendResponseScheduler = OrderedScheduler.newSchedulerBuilder()
                .name("send-response")
                .numThreads(kafkaConfig.getNumSendKafkaResponseThreads())
                .build();
    }

    // This method is called after initialize
    @Override
    public String getProtocolDataToAdvertise() {
        String result =  kafkaConfig.getKafkaAdvertisedListeners();
        log.info("Advertised addresses for the 'kafka' endpoint: {}", result);
        return result;
    }

    @Override
    public void start(BrokerService service) {
        log.info("Starting KafkaProtocolHandler, kot version is: '{}'", KotVersion.getVersion());
        log.info("Git Revision {}", KotVersion.getGitSha());
        log.info("Built by {} on {} at {}",
            KotVersion.getBuildUser(),
            KotVersion.getBuildHost(),
            KotVersion.getBuildTime());

        brokerService = service;
        CnmqAdmin cnmqAdmin;
        try {
            cnmqAdmin = brokerService.getCnmq().getAdminClient();
            adminManager = new AdminManager(cnmqAdmin, kafkaConfig);
        } catch (CnmqServerException e) {
            log.error("Failed to get cnmqAdmin", e);
            throw new IllegalStateException(e);
        }

        lookupClient = new LookupClient(brokerService.cnmq(), kafkaConfig);
        offsetTopicClient = new SystemTopicClient(brokerService.cnmq(), kafkaConfig);
        txnTopicClient = new SystemTopicClient(brokerService.cnmq(), kafkaConfig);

        try {
            kotBrokerLookupManager = new KotBrokerLookupManager(kafkaConfig, brokerService.getCnmq(), lookupClient);
        } catch (Exception ex) {
            log.error("Failed to get kotBrokerLookupManager", ex);
            throw new IllegalStateException(ex);
        }
        kafkaTopicManagerSharedState = new KafkaTopicManagerSharedState(brokerService, kotBrokerLookupManager);

        // Listener for invalidating the global Broker ownership cache
        bundleListener = new NamespaceBundleOwnershipListenerImpl(brokerService);

        bundleListener.addTopicOwnershipListener(new TopicOwnershipListener() {

            @Override
            public void whenUnload(TopicName topicName) {
                invalidateBundleCache(topicName);
                invalidatePartitionLog(topicName);
            }

            @Override
            public void whenDelete(TopicName topicName) {
                invalidateBundleCache(topicName);
                invalidatePartitionLog(topicName);
            }

            @Override
            public boolean interestedInEvent(NamespaceName namespaceName, EventType event) {
                switch (event) {
                    case UNLOAD:
                    case DELETE:
                        return true;
                }
                return false;
            }

            @Override
            public String name() {
                return "CacheInvalidator";
            }

            private void invalidateBundleCache(TopicName topicName) {
                kafkaTopicManagerSharedState.deReference(topicName.toString());
                if (!topicName.isPartitioned()) {
                    String nonPartitionedTopicName = topicName.getPartition(0).toString();
                    kafkaTopicManagerSharedState.deReference(nonPartitionedTopicName);
                }
            }

            private void invalidatePartitionLog(TopicName topicName) {
                getReplicaManager().removePartitionLog(topicName.toString());
                if (!topicName.isPartitioned()) {
                    getReplicaManager().removePartitionLog(topicName.getPartition(0).toString());
                }
            }
        });
        bundleListener.register();

        recoveryExecutor = OrderedExecutor
                .newBuilder()
                .name("kafka-tx-recovery")
                .numThreads(kafkaConfig.getKafkaTransactionRecoveryNumThreads())
                .build();

        if (kafkaConfig.isKafkaManageSystemNamespaces()) {
            // initialize default Group Coordinator
            getGroupCoordinator(kafkaConfig.getKafkaMetadataTenant());
        }

        // init KotEventManager
        kotEventManager = new KotEventManager(adminManager,
                brokerService.getCnmq().getLocalMetadataStore(),
                requestStats.getStatsLogger(),
                kafkaConfig,
                groupCoordinatorsByTenant);
        kotEventManager.start();

        if (kafkaConfig.isKafkaTransactionCoordinatorEnabled() && kafkaConfig.isKafkaManageSystemNamespaces()) {
            getTransactionCoordinator(kafkaConfig.getKafkaMetadataTenant());
        }

        Configuration conf = new PropertiesConfiguration();
        conf.addProperty("prometheusStatsLatencyRolloverSeconds",
            kafkaConfig.getKotPrometheusStatsLatencyRolloverSeconds());
        conf.addProperty("cluster", kafkaConfig.getClusterName());
        statsProvider.start(conf);
        brokerService.cnmq().addPrometheusRawMetricsProvider(statsProvider);
        schemaRegistryManager = new SchemaRegistryManager(kafkaConfig, brokerService.getCnmq(),
                brokerService.getAuthenticationService());
        migrationManager = new MigrationManager(kafkaConfig, brokerService.getCnmq());

        if (kafkaConfig.isKafkaTransactionCoordinatorEnabled()
                && kafkaConfig.getKafkaTxnPurgeAbortedTxnIntervalSeconds() > 0) {
            txUpdatedPurgeAbortedTxOffsetsTimeHandle = service.getCnmq().getExecutor().scheduleWithFixedDelay(() -> {
                        getReplicaManager().updatePurgeAbortedTxnsOffsets();
                    },
                    kafkaConfig.getKafkaTxnPurgeAbortedTxnIntervalSeconds(),
                    kafkaConfig.getKafkaTxnPurgeAbortedTxnIntervalSeconds(),
                    TimeUnit.SECONDS);
        }
    }

    private TransactionCoordinator createAndBootTransactionCoordinator(String tenant) {
        log.info("createAndBootTransactionCoordinator {}", tenant);
        final ClusterData clusterData = ClusterData.builder()
                .serviceUrl(brokerService.getCnmq().getWebServiceAddress())
                .serviceUrlTls(brokerService.getCnmq().getWebServiceAddressTls())
                .brokerServiceUrl(brokerService.getCnmq().getBrokerServiceUrl())
                .brokerServiceUrlTls(brokerService.getCnmq().getBrokerServiceUrlTls())
                .build();
        try {
            TransactionCoordinator transactionCoordinator =
                    initTransactionCoordinator(tenant, brokerService.getCnmq().getAdminClient(), clusterData);
            // Listening transaction topic load/unload
            final NamespaceName kafkaMetaNs = NamespaceName.get(tenant, kafkaConfig.getKafkaMetadataNamespace());
            final String metadataNamespace = kafkaConfig.getKafkaMetadataNamespace();
            bundleListener.addTopicOwnershipListener(new TopicOwnershipListener() {
                @Override
                public void whenLoad(TopicName topicName) {
                    if (KotTopic.isTransactionMetadataTopicName(topicName.toString(), metadataNamespace)) {
                        transactionCoordinator.handleTxnImmigration(topicName.getPartitionIndex());
                    }
                }

                @Override
                public void whenUnload(TopicName topicName) {
                    if (KotTopic.isTransactionMetadataTopicName(topicName.toString(), metadataNamespace)) {
                        transactionCoordinator.handleTxnEmigration(topicName.getPartitionIndex());
                    }
                }

                @Override
                public String name() {
                    return "TransactionStateRecover-" + transactionCoordinator.getTopicPartitionName();
                }

                @Override
                public boolean interestedInEvent(NamespaceName namespaceName, EventType event) {
                    switch (event) {
                        case LOAD:
                        case UNLOAD:
                            return namespaceName.equals(kafkaMetaNs);
                        default:
                            return false;
                    }
                }
            });
            return transactionCoordinator;
        } catch (Exception e) {
            log.error("Initialized transaction coordinator failed.", e);
            throw new IllegalStateException(e);
        }
    }

    private GroupCoordinator createAndBootGroupCoordinator(String tenant) {
        log.info("createAndBootGroupCoordinator {}", tenant);
        final ClusterData clusterData = ClusterData.builder()
                .serviceUrl(brokerService.getCnmq().getWebServiceAddress())
                .serviceUrlTls(brokerService.getCnmq().getWebServiceAddressTls())
                .brokerServiceUrl(brokerService.getCnmq().getBrokerServiceUrl())
                .brokerServiceUrlTls(brokerService.getCnmq().getBrokerServiceUrlTls())
                .build();

        GroupCoordinator groupCoordinator;
        try {
            MetadataUtils.createOffsetMetadataIfMissing(tenant, brokerService.getCnmq().getAdminClient(),
                    clusterData, kafkaConfig);

            // init and start group coordinator
            groupCoordinator = startGroupCoordinator(tenant, offsetTopicClient);

            // and listener for Offset topics load/unload
            final NamespaceName kafkaMetaNs = NamespaceName.get(tenant, kafkaConfig.getKafkaMetadataNamespace());
            final String metadataNamespace = kafkaConfig.getKafkaMetadataNamespace();
            bundleListener.addTopicOwnershipListener(new TopicOwnershipListener() {
                @Override
                public void whenLoad(TopicName topicName) {
                    if (KotTopic.isGroupMetadataTopicName(topicName.toString(), metadataNamespace)) {
                        groupCoordinator.handleGroupImmigration(topicName.getPartitionIndex());
                    }
                }

                @Override
                public void whenUnload(TopicName topicName) {
                    if (KotTopic.isGroupMetadataTopicName(topicName.toString(), metadataNamespace)) {
                        groupCoordinator.handleGroupEmigration(topicName.getPartitionIndex());
                    }
                }

                @Override
                public String name() {
                    return "OffsetAndTopicListener-" + groupCoordinator.getGroupManager().getTopicPartitionName();
                }

                @Override
                public boolean interestedInEvent(NamespaceName namespaceName, EventType event) {
                    switch (event) {
                        case LOAD:
                        case UNLOAD:
                            return namespaceName.equals(kafkaMetaNs);
                        default:
                            return false;
                    }
                }

            });
        } catch (Exception e) {
            log.error("Failed to create offset metadata", e);
            throw new IllegalStateException(e);
        }

        // init kafka namespaces
        try {
            MetadataUtils.createKafkaNamespaceIfMissing(brokerService.getCnmq().getAdminClient(),
                    clusterData, kafkaConfig);
        } catch (Exception e) {
            // no need to throw exception since we can create kafka namespace later
            log.warn("init kafka failed, need to create it manually later", e);
        }

        return groupCoordinator;
    }

    private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoint) {
        return new KafkaChannelInitializer(
                brokerService.getCnmq(),
                kafkaConfig,
                this,
                replicaManager,
                kotBrokerLookupManager,
                adminManager,
                producePurgatory,
                fetchPurgatory,
                endPoint.isTlsEnabled(),
                endPoint,
                kafkaConfig.isSkipMessagesWithoutIndex(),
                requestStats,
                sendResponseScheduler,
                kafkaTopicManagerSharedState,
                kafkaTopicLookupService,
                lookupClient);
    }

    class ProducerStateManagerSnapshotProvider implements Function<String, ProducerStateManagerSnapshotBuffer> {
        @Override
        public ProducerStateManagerSnapshotBuffer apply(String tenant) {
            if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
                return new MemoryProducerStateManagerSnapshotBuffer();
            }
            return getTransactionCoordinator(tenant)
                    .getProducerStateManagerSnapshotBuffer();
        }
    }

    private Function<String, ProducerStateManagerSnapshotBuffer> getProducerStateManagerSnapshotBufferByTenant =
            new ProducerStateManagerSnapshotProvider();

    // this is called after initialize, and with kafkaConfig, brokerService all set.
    @Override
    public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
        checkState(kafkaConfig != null);
        checkState(brokerService != null);

        producePurgatory = DelayedOperationPurgatory.builder()
                .purgatoryName("produce")
                .timeoutTimer(SystemTimer.builder().executorName("produce").build())
                .build();
        fetchPurgatory = DelayedOperationPurgatory.builder()
                .purgatoryName("fetch")
                .timeoutTimer(SystemTimer.builder().executorName("fetch").build())
                .build();

        kafkaTopicLookupService = new KafkaTopicLookupService(brokerService, kotBrokerLookupManager);

        replicaManager = new ReplicaManager(
                kafkaConfig,
                requestStats,
                Time.SYSTEM,
                brokerService.getEntryFilterProvider().getBrokerEntryFilters(),
                producePurgatory,
                fetchPurgatory,
                kafkaTopicLookupService,
                getProducerStateManagerSnapshotBufferByTenant,
                recoveryExecutor
        );

        try {
            ImmutableMap.Builder<InetSocketAddress, ChannelInitializer<SocketChannel>> builder =
                    ImmutableMap.builder();

            EndPoint.parseListeners(kafkaConfig.getListeners(), kafkaConfig.getKafkaProtocolMap()).
                    forEach((listener, endPoint) ->
                            builder.put(endPoint.getInetAddress(), newKafkaChannelInitializer(endPoint))
                    );

            Optional<HttpChannelInitializer> migrationChannelInitializer = migrationManager.build();
            migrationChannelInitializer.ifPresent(
                    initializer -> builder.put(migrationManager.getAddress(),
                            initializer));

            Optional<SchemaRegistryChannelInitializer> schemaRegistryChannelInitializer = schemaRegistryManager.build();
            schemaRegistryChannelInitializer.ifPresent(
                    registryChannelInitializer -> builder.put(schemaRegistryManager.getAddress(),
                            registryChannelInitializer));
            channelInitializerMap = builder.build();
            return channelInitializerMap;
        } catch (Exception e){
            log.error("KafkaProtocolHandler newChannelInitializers failed with ", e);
            return null;
        }
    }

    @Override
    public void close() {
        if (txUpdatedPurgeAbortedTxOffsetsTimeHandle != null) {
            txUpdatedPurgeAbortedTxOffsetsTimeHandle.cancel(false);
        }

        if (producePurgatory != null) {
            producePurgatory.shutdown();
        }
        if (fetchPurgatory != null) {
            fetchPurgatory.shutdown();
        }
        groupCoordinatorsByTenant.values().forEach(GroupCoordinator::shutdown);
        kotEventManager.close();
        if (schemaRegistryManager != null) {
            schemaRegistryManager.close();
        }
        transactionCoordinatorByTenant.values().forEach(TransactionCoordinator::shutdown);
        kafkaTopicManagerSharedState.close();
        kotBrokerLookupManager.close();
        statsProvider.stop();
        sendResponseScheduler.shutdown();

        if (offsetTopicClient != null) {
            offsetTopicClient.close();
        }
        if (txnTopicClient != null) {
            txnTopicClient.close();
        }
        if (adminManager != null) {
            adminManager.shutdown();
        }
        recoveryExecutor.shutdown();

        List<CompletableFuture<?>> closeHandles = new ArrayList<>();
        if (offsetTopicClient != null) {
            closeHandles.add(offsetTopicClient.closeAsync());
        }
        if (txnTopicClient != null) {
            closeHandles.add(txnTopicClient.closeAsync());
        }
        if (lookupClient != null) {
            closeHandles.add(lookupClient.closeAsync());
        }
        if (adminManager != null) {
            adminManager.shutdown();
        }

        // do not block the broker forever
        // see https://github.com/apache/cnmq/issues/19579
        try {
            FutureUtil
                    .waitForAll(closeHandles)
                    .get(Math.max(kafkaConfig.getBrokerShutdownTimeoutMs() / 10, 1000),
                            TimeUnit.MILLISECONDS);
        } catch (ExecutionException err) {
            log.warn("Error while closing some of the internal CnmqClients", err.getCause());
        } catch (TimeoutException err) {
            log.warn("Could not stop all the internal CnmqClients within the configured timeout");
        } catch (InterruptedException err) {
            Thread.currentThread().interrupt();
            log.warn("Could not stop all the internal CnmqClients");
        }
    }

    @VisibleForTesting
    protected GroupCoordinator startGroupCoordinator(String tenant, SystemTopicClient client) {
        GroupConfig groupConfig = new GroupConfig(
            kafkaConfig.getGroupMinSessionTimeoutMs(),
            kafkaConfig.getGroupMaxSessionTimeoutMs(),
            kafkaConfig.getGroupInitialRebalanceDelayMs()
        );

        String topicName = tenant + "/" + kafkaConfig.getKafkaMetadataNamespace()
                + "/" + Topic.GROUP_METADATA_TOPIC_NAME;

        CnmqAdmin cnmqAdmin;
        int offsetTopicNumPartitions;
        try {
            cnmqAdmin = brokerService.getCnmq().getAdminClient();
            offsetTopicNumPartitions = cnmqAdmin.topics().getPartitionedTopicMetadata(topicName).partitions;
            if (offsetTopicNumPartitions == 0) {
                log.error("Offset topic should not be a non-partitioned topic.");
                throw new IllegalStateException("Offset topic should not be a non-partitioned topic.");
            }
        }  catch (CnmqServerException | CnmqAdminException e) {
            log.error("Failed to get offset topic partition metadata .", e);
            throw new IllegalStateException(e);
        }

        String namespacePrefixForMetadata = MetadataUtils.constructMetadataNamespace(tenant, kafkaConfig);

        OffsetConfig offsetConfig = OffsetConfig.builder()
            .offsetsTopicName(topicName)
            .offsetsTopicNumPartitions(offsetTopicNumPartitions)
            .offsetsTopicCompressionType(CompressionType.valueOf(kafkaConfig.getOffsetsTopicCompressionCodec()))
            .maxMetadataSize(kafkaConfig.getOffsetMetadataMaxSize())
            .offsetsRetentionCheckIntervalMs(kafkaConfig.getOffsetsRetentionCheckIntervalMs())
            .offsetsRetentionMs(TimeUnit.MINUTES.toMillis(kafkaConfig.getOffsetsRetentionMinutes()))
            .offsetCommitTimeoutMs(kafkaConfig.getOffsetCommitTimeoutMs())
            .build();

        GroupCoordinator groupCoordinator = GroupCoordinator.of(
            tenant,
            client,
            groupConfig,
            offsetConfig,
            namespacePrefixForMetadata,
            SystemTimer.builder()
                .executorName("group-coordinator-timer")
                .build(),
            Time.SYSTEM
        );
        // always enable metadata expiration
        groupCoordinator.startup(true);

        return groupCoordinator;
    }

    public TransactionCoordinator initTransactionCoordinator(String tenant, CnmqAdmin cnmqAdmin,
                                                             ClusterData clusterData) throws Exception {
        TransactionConfig transactionConfig = TransactionConfig.builder()
                .transactionLogNumPartitions(kafkaConfig.getKafkaTxnLogTopicNumPartitions())
                .transactionMetadataTopicName(MetadataUtils.constructTxnLogTopicBaseName(tenant, kafkaConfig))
                .transactionProducerIdTopicName(MetadataUtils.constructTxnProducerIdTopicBaseName(tenant, kafkaConfig))
                .transactionProducerStateSnapshotTopicName(MetadataUtils.constructTxProducerStateTopicBaseName(tenant,
                        kafkaConfig))
                .producerStateTopicNumPartitions(kafkaConfig.getKafkaTxnProducerStateTopicNumPartitions())
                .abortTimedOutTransactionsIntervalMs(kafkaConfig.getKafkaTxnAbortTimedOutTransactionCleanupIntervalMs())
                .transactionalIdExpirationMs(kafkaConfig.getKafkaTransactionalIdExpirationMs())
                .removeExpiredTransactionalIdsIntervalMs(
                        kafkaConfig.getKafkaTransactionsRemoveExpiredTransactionalIdCleanupIntervalMs())
                .brokerId(kafkaConfig.getKafkaBrokerId())
                .build();

        MetadataUtils.createTxnMetadataIfMissing(tenant, cnmqAdmin, clusterData, kafkaConfig);

        TransactionCoordinator transactionCoordinator = TransactionCoordinator.of(
                tenant,
                kafkaConfig,
                transactionConfig,
                txnTopicClient,
                brokerService.getCnmq().getLocalMetadataStore(),
                kotBrokerLookupManager,
                OrderedScheduler
                        .newSchedulerBuilder()
                        .name("transaction-log-manager-" + tenant)
                        .numThreads(1)
                        .build(),
                Time.SYSTEM,
                recoveryExecutor);

        transactionCoordinator.startup(kafkaConfig.isKafkaTransactionalIdExpirationEnable()).get();

        return transactionCoordinator;
    }
}
